package org.example.hive;

import cn.hutool.core.date.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.example.table.HiveCurveData;

import java.util.concurrent.atomic.AtomicReference;

/**
 * @author lwc
 * @description: TODO
 * @date 2023/11/29 14:40
 */
@Slf4j
public class HiveSelectExample {


    public static void HiveSyncDws(SparkSession spark, SparkConf sparkConf) {


        // 查询 Hive 表数据
        log.info("=================开始查询数据===================");
        Dataset<Row> databases = spark.sql("show databases");
        log.info("databases:{}", databases);
        databases.show();
        Dataset<Row> use = spark.sql("use ods_amr20_hbase");
        log.info("use:{}", use);
        Dataset<Row> tables = spark.sql("show tables");
        tables.show();
        String startDate = DateUtil.beginOfMonth(DateUtil.date()).toString("yyyyMMdd");
        // 获取三个月后的日期
        String endDate = DateUtil.offsetMonth(DateUtil.parse(DateUtil.beginOfMonth(DateUtil.date()).toDateStr()), -3).toString("yyyyMMdd");

//        String sql = sparkConf.get("spark.app.sql","SELECT count(*) FROM ods_amr20_hbase.e_mp_u_curve where ds between '"+startDate+"' and '"+endDate+"'");
//        log.info("我输入的sql为：{}",sql);
//        Dataset<Row> sql1 = spark.sql(sql);
//        sql1.show();
        String table = sparkConf.get("spark.app.table", "e_mp_u_curve");
        log.info("输入的表名称为：{}", table);
        StringBuilder sql = new StringBuilder();
        sql.append("select count(*) as count from ods_amr20_hbase.")
                .append(table).append(" where ds between '").append(endDate).append("' and '").append(startDate).append("'");
        Dataset<Row> dataset = spark.sql(sql.toString());
//        Dataset<Row> dataset = spark.sql("select count(*) as count from ods_amr20_hbase." + table);
        dataset.show();
        AtomicReference<Long> count = new AtomicReference<>(0L);
        dataset.foreach(data -> {
            count.set(data.getLong(0));
        });
        log.info("获取到表为：{},count为：{}", table, count);
//        Dataset<Row> hiveData = spark.sql("select * from ods_amr20_hbase." + table + " where ds between '" + endDate + "' and '" + startDate + "'");
////        Dataset<Row> hiveData = spark.sql("select * from ods_amr20_hbase.e_mp_u_curve");
//        hiveData.foreach(row -> {
//            String devId = row.getString(row.fieldIndex("dev_id"));
//            String ds = row.getString(row.fieldIndex("ds"));
//            List<Row> rows = new ArrayList<>();
//            // 遍历u0000, u0001, u00015等列，生成新行
//            for (int i = 0; i <= 2360; i=i+15) {
//                String columnName = "u" + String.format("%04d", i);
//                String dataVs = row.getString(row.fieldIndex(columnName));
//
//                Row newRow = RowFactory.create(devId, ds, columnName, dataVs);
//                rows.add(newRow);
//            }
//
//        });
        log.info("=================查询完毕===================");
//        // 配置MySQL连接信息
//        String jdbcUrl = "jdbc:mysql://localhost:3306/test";
//        Properties connectionProperties = new Properties();
//        connectionProperties.setProperty("user", "root");
//        connectionProperties.setProperty("password", "9XME3z94xs9nhCj");
//
//        // 将结果写入MySQL表
////        result = result.withColumnRenamed("id", "userid")
////                .withColumnRenamed("name","picture");
//        result.write()
//                .mode("append") // 或者使用"append"，取决于你的需求
//                .jdbc(jdbcUrl, "storeup", connectionProperties);

        // 停止 SparkSession
        spark.stop();
    }

    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().setAppName("Spark HBase Example");
        SparkSession spark = null;
        if (args.length >= 1) {
            spark = SparkSession.builder()
                    .appName("HBase to MySQL Sync")
                    .config(sparkConf)
//                    .config("spark.master", "local[*]")  // 使用本地模式，[*]表示使用所有可用的核心
                    .enableHiveSupport()
                    .getOrCreate();
            log.info("加上了local[*]");
        } else {
            spark = SparkSession.builder()
                    .appName("HBase to MySQL Sync")
                    .config(sparkConf)
                    .config("spark.master", "local[*]")  // 使用本地模式，[*]表示使用所有可用的核心
                    .enableHiveSupport()
                    .getOrCreate();
        }
//        HiveSyncDws(spark, sparkConf);
        HiveCurveData.Data(spark, sparkConf);
        spark.stop();
    }


}
